package com.atuguigu.flink.Day02.window;

import com.atuguigu.flink.Day01.Singlesensor.SensorSource;
import com.atuguigu.flink.sensor.SendsorReading;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

// 使用全窗口聚合函数实现窗口里的最大最小温度值
public class Example5 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env
                .addSource(new SensorSource())
                .filter(r->r.id.equals("sensor_1"))
                .keyBy(r->r.id)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .process(new WindowResule())
                .print();



        env.execute();

    }
    //泛型为ProcessWindowFunction<IN, OUT, KEY, W extends Window>输入，输出，key,窗口
    //全窗口为继承.
    public static class WindowResule extends ProcessWindowFunction<SendsorReading,MinMaxTempPerWindow,String, TimeWindow> {

        //process 在窗口闭合的时候调用
        @Override
        public void process(String s, Context context, Iterable<SendsorReading> iterable, Collector<MinMaxTempPerWindow> collector) throws Exception {
            double minTemp=Double.MIN_VALUE;
            double maxTemp=Double.MAX_VALUE;
            for(SendsorReading e:iterable){
                if(e.temperture<minTemp){
                    minTemp=e.temperture;
                }
                if(e.temperture>maxTemp){
                    maxTemp=e.temperture;
                }
            }
            long WindowStrat=context.window().getStart();
            long WindowEnd=context.window().getEnd();
            collector.collect(new MinMaxTempPerWindow(s,minTemp,maxTemp,WindowStrat,WindowEnd));




        }
    }

    public static class MinMaxTempPerWindow{
        public String id;
        public Double minTemp;
        public Double maxTemp;
        public Long   WindowStrat;
        public Long   WindowEnd;

        public MinMaxTempPerWindow() {
        }

        public MinMaxTempPerWindow(String id, Double minTemp, Double maxTemp, Long windowStrat, Long windowEnd) {
            this.id = id;
            this.minTemp = minTemp;
            this.maxTemp = maxTemp;
            WindowStrat = windowStrat;
            WindowEnd = windowEnd;
        }

        @Override
        public String toString() {
            return "MinMaxTempPerWindow{" +
                    "id='" + id + '\'' +
                    ", minTemp=" + minTemp +
                    ", maxTemp=" + maxTemp +
                    ", WindowStrat=" + WindowStrat +
                    ", WindowEnd=" + WindowEnd +
                    '}';
        }
    }
}
